/*
 *  Copyright 1999-2019 Seata.io Group.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package io.seata.rm.datasource;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;

import io.seata.common.thread.NamedThreadFactory;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.context.RootContext;
import io.seata.core.model.BranchType;
import io.seata.core.model.Resource;
import io.seata.rm.DefaultResourceManager;
import io.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
import io.seata.rm.datasource.util.JdbcUtils;
import io.seata.sqlparser.util.JdbcConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.seata.common.DefaultValues.DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE;
import static io.seata.common.DefaultValues.DEFAULT_TABLE_META_CHECKER_INTERVAL;

/**
 * 数据源代理类
 */
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {

    private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceProxy.class);

    private static final String DEFAULT_RESOURCE_GROUP_ID = "DEFAULT";

    private String resourceGroupId;

    private String jdbcUrl;

    private String dbType;

    private String userName;

    /**
     * 开启表元数据的检查
     * 通过获取key=“client.rm.tableMetaCheckEnable”的配置信息来判断，如果没有配置，则默认为false
     */
    private static boolean ENABLE_TABLE_META_CHECKER_ENABLE = ConfigurationFactory.getInstance().getBoolean(
        ConfigurationKeys.CLIENT_TABLE_META_CHECK_ENABLE, DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE);

    /**
     * 表元数据的检查间隔时间
     * 通过获取key=“client.rm.tableMetaCheckerInterval”的配置信息来判断，如果没有配置，则默认为60000L
     */
    private static final long TABLE_META_CHECKER_INTERVAL = ConfigurationFactory.getInstance().getLong(
            ConfigurationKeys.CLIENT_TABLE_META_CHECKER_INTERVAL, DEFAULT_TABLE_META_CHECKER_INTERVAL);

    private final ScheduledExecutorService tableMetaExcutor = new ScheduledThreadPoolExecutor(1,
        new NamedThreadFactory("tableMetaChecker", 1, true));

    /**
     * 创建一个数据源代理实例对象
     */
    public DataSourceProxy(DataSource targetDataSource) {
        this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID); // DEFAULT_RESOURCE_GROUP_ID="DEFAULT"
    }

    /**
     * 创建一个数据源代理实例对象
     */
    public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
        /** 如果传入的是SeataDataSourceProxy，则获取目标数据源 */
        if (targetDataSource instanceof SeataDataSourceProxy) {
            LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
            targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
        }

        this.targetDataSource = targetDataSource;

        /** 初始化数据源代理 */
        init(targetDataSource, resourceGroupId);
    }

    /**
     * 初始化数据源代理
     */
    private void init(DataSource dataSource, String resourceGroupId) {
        this.resourceGroupId = resourceGroupId;
        try (Connection connection = dataSource.getConnection()) {
            /** 获得数据库连接URL，并保存到全局变量jdbcUrl中 */
            jdbcUrl = connection.getMetaData().getURL();

            /** 获得数据库类型，并保存到全局变量dbType中 */
            dbType = JdbcUtils.getDbType(jdbcUrl);

            /** 如果是Oracle数据库，则获得数据库连接的用户名，并保存到全局变量userName中 */
            if (JdbcConstants.ORACLE.equals(dbType)) {
                userName = connection.getMetaData().getUserName();
            }
        } catch (SQLException e) {
            throw new IllegalStateException("can not init dataSource", e);
        }

        /** 将数据源代理类实例（DataSourceProxy）注册到资源管理器（ResourceManager）中 */
        DefaultResourceManager.get().registerResource(this);

        /** 如果开启了表元数据的检查 */
        if (ENABLE_TABLE_META_CHECKER_ENABLE) { // 默认是false，不进行表元数据的检查
            tableMetaExcutor.scheduleAtFixedRate(() -> {
                try (Connection connection = dataSource.getConnection()) {
                    /** 采用定时任务的方式刷新表元数据缓存 */
                    TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                        .refresh(connection, DataSourceProxy.this.getResourceId());
                } catch (Exception ignore) {
                }
            }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS); // 默认1分钟执行一次
        }

        //Set the default branch type to 'AT' in the RootContext.
        /** 设置默认的branch type为AT */
        RootContext.setDefaultBranchType(this.getBranchType());
    }

    /**
     * Gets plain connection.
     *
     * @return the plain connection
     * @throws SQLException the sql exception
     */
    public Connection getPlainConnection() throws SQLException {
        return targetDataSource.getConnection();
    }

    /**
     * Gets db type.
     *
     * @return the db type
     */
    public String getDbType() {
        return dbType;
    }

    /**
     * 获得连接代理实例
     */
    @Override
    public ConnectionProxy getConnection() throws SQLException {
        /** 通过目标数据源（targetDataSource）获得目标连接（targetConnection） */
        Connection targetConnection = targetDataSource.getConnection();

        /** 通过目标连接（targetConnection）构造连接代理（ConnectionProxy） */
        return new ConnectionProxy(this, targetConnection);
    }

    /**
     * 获得连接代理实例
     */
    @Override
    public ConnectionProxy getConnection(String username, String password) throws SQLException {
        Connection targetConnection = targetDataSource.getConnection(username, password);
        return new ConnectionProxy(this, targetConnection);
    }

    @Override
    public String getResourceGroupId() {
        return resourceGroupId;
    }

    @Override
    public String getResourceId() {
        if (JdbcConstants.POSTGRESQL.equals(dbType)) {
            return getPGResourceId();
        } else if (JdbcConstants.ORACLE.equals(dbType) && userName != null) {
            return getDefaultResourceId() + "/" + userName;
        } else {
            return getDefaultResourceId();
        }
    }

    /**
     * get the default resource id
     * @return resource id
     */
    private String getDefaultResourceId() {
        if (jdbcUrl.contains("?")) {
            return jdbcUrl.substring(0, jdbcUrl.indexOf('?'));
        } else {
            return jdbcUrl;
        }
    }

    /**
     * prevent pg sql url like
     * jdbc:postgresql://127.0.0.1:5432/seata?currentSchema=public
     * jdbc:postgresql://127.0.0.1:5432/seata?currentSchema=seata
     * cause the duplicated resourceId
     * it will cause the problem like
     * 1.get file lock fail
     * 2.error table meta cache
     * @return resourceId
     */
    private String getPGResourceId() {
        if (jdbcUrl.contains("?")) {
            StringBuilder jdbcUrlBuilder = new StringBuilder();
            jdbcUrlBuilder.append(jdbcUrl.substring(0, jdbcUrl.indexOf('?')));
            StringBuilder paramsBuilder = new StringBuilder();
            String paramUrl = jdbcUrl.substring(jdbcUrl.indexOf('?') + 1, jdbcUrl.length());
            String[] urlParams = paramUrl.split("&");
            for (String urlParam : urlParams) {
                if (urlParam.contains("currentSchema")) {
                    paramsBuilder.append(urlParam);
                    break;
                }
            }

            if (paramsBuilder.length() > 0) {
                jdbcUrlBuilder.append("?");
                jdbcUrlBuilder.append(paramsBuilder);
            }
            return jdbcUrlBuilder.toString();
        } else {
            return jdbcUrl;
        }
    }

    @Override
    public BranchType getBranchType() {
        return BranchType.AT;
    }
}
